package com.innovation.ic.b1b.monitor.base.service.masterSlave.impl;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.innovation.ic.b1b.monitor.base.handler.KafkaAvailableJobHandler;
import com.innovation.ic.b1b.monitor.base.mapper.KafkaAvailableJobMapper;
import com.innovation.ic.b1b.monitor.base.model.Kafka;
import com.innovation.ic.b1b.monitor.base.model.KafkaAvailableJob;
import com.innovation.ic.b1b.monitor.base.pojo.constant.JobDataMapConstant;
import com.innovation.ic.b1b.monitor.base.pojo.constant.KafkaAvailableJobConstant;
import com.innovation.ic.b1b.monitor.base.pojo.constant.TableNameConstant;
import com.innovation.ic.b1b.monitor.base.pojo.enums.EnableEnum;
import com.innovation.ic.b1b.monitor.base.pojo.variable.ServiceResult;
import com.innovation.ic.b1b.monitor.base.pojo.variable.kafkaAvailableJob.KafkaAvailableJobInfoRespPojo;
import com.innovation.ic.b1b.monitor.base.pojo.variable.kafkaAvailableJob.KafkaAvailableJobListRespData;
import com.innovation.ic.b1b.monitor.base.service.masterSlave.KafkaAvailableJobService;
import com.innovation.ic.b1b.monitor.base.service.masterSlave.ServiceHelper;
import com.innovation.ic.b1b.monitor.base.vo.kafkaAvailableJob.KafkaAvailableJobAddVo;
import com.innovation.ic.b1b.monitor.base.vo.kafkaAvailableJob.KafkaAvailableJobListQueryVo;
import com.innovation.ic.b1b.monitor.base.vo.kafkaAvailableJob.KafkaAvailableJobUpdateVo;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobDataMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;

/**
 * @desc   Kafka可用任务表Service实现类
 * @author linuo
 * @time   2023年5月8日11:38:52
 */
@Slf4j
@Service
public class KafkaAvailableJobServiceImpl extends ServiceImpl<KafkaAvailableJobMapper, KafkaAvailableJob> implements KafkaAvailableJobService {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Resource
    private ServiceHelper serviceHelper;

    /**
     * 添加kafka监控任务
     * @param kafkaAvailableJobAddVo 添加kafka监控任务的Vo类
     * @return 返回添加结果
     */
    @Override
    public ServiceResult<Boolean> add(KafkaAvailableJobAddVo kafkaAvailableJobAddVo) {
        boolean result = Boolean.FALSE;
        String message;

        Boolean aBoolean = judgeIfHaveSameData(null, kafkaAvailableJobAddVo.getKafkaId());
        if(aBoolean){
            message = "当前kafka监控任务已存在，请勿重复添加";
        }else{
            // 校验kafkaId是否存在
            Kafka kafka = serviceHelper.getKafkaMapper().selectById(kafkaAvailableJobAddVo.getKafkaId());
            if(kafka != null){
                KafkaAvailableJob kafkaAvailableJob = new KafkaAvailableJob();
                BeanUtils.copyProperties(kafkaAvailableJobAddVo, kafkaAvailableJob);
                if(kafkaAvailableJob.getEnable() == null){
                    kafkaAvailableJob.setEnable(EnableEnum.NO.getCode());
                }
                kafkaAvailableJob.setCreateTime(new Date(System.currentTimeMillis()));
                int insert = serviceHelper.getKafkaAvailableJobMapper().insert(kafkaAvailableJob);
                if(insert > 0){
                    message = ServiceResult.INSERT_SUCCESS;
                    result = Boolean.TRUE;

                    String jobId = TableNameConstant.KAFKA_AVAILABLE_JOB + "_" + kafkaAvailableJob.getId();

                    // 组装任务参数
                    JobDataMap jobDataMap = new JobDataMap();
                    jobDataMap.put(JobDataMapConstant.ID, kafkaAvailableJob.getId());

                    serviceHelper.getQuartzManager().createScheduleJob(jobId, KafkaAvailableJobHandler.class.getName(), "KafkaAvailableJobHandler", jobDataMap, kafkaAvailableJob.getScheduleExpression());
                }else{
                    message = ServiceResult.INSERT_FAIL;
                }
            }else{
                message = "kafka信息不存在，请配置kafka信息后重新添加kafka监控任务";
            }
        }

        ServiceResult<Boolean> serviceResult = new ServiceResult<>();
        serviceResult.setSuccess(Boolean.TRUE);
        serviceResult.setResult(result);
        serviceResult.setMessage(message);
        return serviceResult;
    }

    /**
     * 查询kafka监控任务列表
     * @param kafkaAvailableJobListQueryVo 查询kafka监控任务列表的Vo类
     * @param isMain 是否查询主库
     * @return 返回查询结果
     */
    @Override
    public ServiceResult<PageInfo<KafkaAvailableJobListRespData>> queryList(KafkaAvailableJobListQueryVo kafkaAvailableJobListQueryVo, Boolean isMain) {
        PageHelper.startPage(kafkaAvailableJobListQueryVo.getPageNo(), kafkaAvailableJobListQueryVo.getPageSize());

        // 查询kafka可用任务列表数据
        List<KafkaAvailableJobListRespData> data = serviceHelper.getKafkaAvailableJobMapper().queryList(kafkaAvailableJobListQueryVo);
        PageInfo<KafkaAvailableJobListRespData> pageInfo = new PageInfo<>(data);

        return ServiceResult.ok(pageInfo, ServiceResult.SELECT_SUCCESS);
    }

    /**
     * 删除kafka监控任务
     * @param id kafka监控任务id
     * @return 返回删除结果
     */
    @Override
    public ServiceResult<Boolean> delete(Integer id) {
        boolean result = Boolean.FALSE;
        String message = ServiceResult.DELETE_FAIL;

        int i = serviceHelper.getKafkaAvailableJobMapper().deleteById(id);
        if(i > 0){
            logger.info("已删除id为[{}]kafka监控任务", id);
            result = Boolean.TRUE;
            message = ServiceResult.DELETE_SUCCESS;

            try {
                // 删除任务
                String jobId = TableNameConstant.KAFKA_AVAILABLE_JOB + "_" + id;
                serviceHelper.getQuartzManager().deleteScheduleJob(jobId);
            }catch (Exception e){
                logger.info("删除kafka监控后台任务失败,原因:", e);
            }
        }else{
            logger.info("删除id为[{}]kafka监控任务失败，数据不存在", id);
        }

        return ServiceResult.ok(result, message);
    }

    /**
     * 获取kafka监控任务详情
     * @param id  kafka监控任务id
     * @param isMain 是否查询主库
     * @return 返回查询结果
     */
    @Override
    public ServiceResult<KafkaAvailableJobInfoRespPojo> info(Integer id, Boolean isMain) {
        KafkaAvailableJobInfoRespPojo result = serviceHelper.getKafkaAvailableJobMapper().info(id);
        return ServiceResult.ok(result, ServiceResult.SELECT_SUCCESS);
    }

    /**
     * 更新kafka监控任务
     * @param kafkaAvailableJobUpdateVo 更新kafka监控任务的Vo类
     * @return 返回更新结果
     */
    @Override
    public ServiceResult<Boolean> update(KafkaAvailableJobUpdateVo kafkaAvailableJobUpdateVo) {
        Boolean aBoolean = judgeIfHaveSameData(kafkaAvailableJobUpdateVo.getId(), kafkaAvailableJobUpdateVo.getKafkaId());
        if(aBoolean){
            ServiceResult<Boolean> serviceResult = new ServiceResult<>();
            serviceResult.setSuccess(Boolean.TRUE);
            serviceResult.setResult(Boolean.FALSE);
            serviceResult.setMessage("当前kafka监控任务已存在，请勿重复添加");
            return serviceResult;
        }else{
            // 校验kafkaId是否存在
            Kafka kafka = serviceHelper.getKafkaMapper().selectById(kafkaAvailableJobUpdateVo.getKafkaId());
            if(kafka != null){
                KafkaAvailableJob historyData = serviceHelper.getKafkaAvailableJobMapper().selectById(kafkaAvailableJobUpdateVo.getId());

                int i = serviceHelper.getKafkaAvailableJobMapper().updateInfo(kafkaAvailableJobUpdateVo);
                if(i > 0){
                    logger.info("更新id为[{}]kafka监控任务成功", kafkaAvailableJobUpdateVo.getId());

                    Integer enable = kafkaAvailableJobUpdateVo.getEnable();
                    // 原来是启用现在关闭时需要暂停后台任务
                    if(historyData.getEnable().intValue() == EnableEnum.YES.getCode().intValue() && enable.intValue() == EnableEnum.NO.getCode().intValue()){
                        String jobId = TableNameConstant.KAFKA_AVAILABLE_JOB + "_" + kafkaAvailableJobUpdateVo.getId();
                        logger.info("暂停后台任务,jobId:[{}]", jobId);
                        serviceHelper.getQuartzManager().pauseScheduleJob(jobId);
                    }else if(historyData.getEnable().intValue() == EnableEnum.NO.getCode().intValue() && enable.intValue() == EnableEnum.YES.getCode().intValue()) {
                        // 原来是关闭现在启用任务时需要恢复后台任务
                        String jobId = TableNameConstant.KAFKA_AVAILABLE_JOB + "_" + kafkaAvailableJobUpdateVo.getId();
                        logger.info("恢复后台任务,jobId:[{}]", jobId);
                        serviceHelper.getQuartzManager().resumeScheduleJob(jobId);
                    }else{
                        String scheduleExpression = kafkaAvailableJobUpdateVo.getScheduleExpression();
                        if(kafkaAvailableJobUpdateVo.getEnable().intValue() == EnableEnum.YES.getCode().intValue() && !historyData.getScheduleExpression().equals(scheduleExpression)){
                            String jobId = TableNameConstant.KAFKA_AVAILABLE_JOB + "_" + kafkaAvailableJobUpdateVo.getId();
                            serviceHelper.getQuartzManager().updateScheduleJob(jobId, scheduleExpression);
                        }
                    }

                    return ServiceResult.ok(Boolean.TRUE, ServiceResult.UPDATE_SUCCESS);
                }else{
                    logger.info("更新id为[{}]kafka监控任务失败", kafkaAvailableJobUpdateVo.getId());
                    return ServiceResult.ok(Boolean.FALSE, ServiceResult.UPDATE_FAIL);
                }
            }else{
                logger.info("kafkaId=[{}]的信息不存在", kafkaAvailableJobUpdateVo.getKafkaId());
                return ServiceResult.error(ServiceResult.UPDATE_FAIL);
            }
        }
    }

    /**
     * 执行任务
     * @param id kafka监控任务id
     * @return 返回执行结果
     */
    @Override
    public ServiceResult<Boolean> executeJob(Integer id) {
        ServiceResult<Boolean> serviceResult = new ServiceResult<>();

        try {
            String jobId = TableNameConstant.KAFKA_AVAILABLE_JOB + "_" + id;
            serviceHelper.getQuartzManager().runOnce(jobId);
        }catch (Exception e){
            serviceResult.setMessage(ServiceResult.OPERATE_FAIL);
            serviceResult.setResult(Boolean.FALSE);
            serviceResult.setSuccess(Boolean.FALSE);
            return serviceResult;
        }

        serviceResult.setMessage(ServiceResult.OPERATE_SUCCESS);
        serviceResult.setResult(Boolean.TRUE);
        serviceResult.setSuccess(Boolean.TRUE);
        return serviceResult;
    }

    /**
     * 判断是否有相同数据
     * @param id 主键id
     * @param kafkaId kafkaId
     * @return 返回判断结果
     */
    private Boolean judgeIfHaveSameData(Integer id, Integer kafkaId){
        Boolean result = Boolean.FALSE;

        QueryWrapper<KafkaAvailableJob> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq(KafkaAvailableJobConstant.KAFKA_ID, kafkaId);
        if(id != null){
            queryWrapper.ne(KafkaAvailableJobConstant.ID, id);
        }
        Integer selectCount = serviceHelper.getKafkaAvailableJobMapper().selectCount(queryWrapper);
        if(selectCount > 0){
            result = Boolean.TRUE;
        }

        return result;
    }
}